Skip to content

add moriio transfer engine#1742

Open
inkcherry wants to merge 41 commits into
vllm-project:mainfrom
inkcherry:moriio
Open

add moriio transfer engine#1742
inkcherry wants to merge 41 commits into
vllm-project:mainfrom
inkcherry:moriio

Conversation

@inkcherry
Copy link
Copy Markdown

@inkcherry inkcherry commented Mar 9, 2026

Purpose

Add MoriTransferEngineConnector — a new OmniConnector backend using Mori RDMA transfer engine for zero-copy data transfers between disaggregated pipeline stages. The implementation follows a similar architecture to MooncakeTransferEngineConnector, adapted to Mori's IOEngine / MemoryDesc / EngineDesc API.

  • RDMA data plane via Mori IOEngine.batch_write() with async TransferStatus tracking
  • ZMQ control plane with msgspec-encoded pull-request / query handshakes
  • Pinned-CPU or CUDA memory pool with first-fit allocator and TTL-based cleanup
  • Raw torch.Tensor / bytes fast path bypassing serialization
  • Graceful fallback when mori is not installed

Changes include the connector implementation, factory registration, module exports, design doc, and example stage config YAML.

Test Plan

Hardware: 3x AMD Instinct MI300X nodes (8 GPUs each), Mellanox ConnectX-7 400Gbps RoCE NIC (mlx5_0).

3-node disaggregated serving with Qwen2.5-Omni-7B, each stage on a separate node with TP=2:

# Node 1 — Thinker (stage 0)
vllm serve Qwen/Qwen2.5-Omni-7B --omni \
    --port 8000 \
    --tensor-parallel-size 2 \
    --stage-configs-path qwen2_5_omni_mori.yaml \
    --stage-id 0 --log-stats \
    -oma <orchestrator_ip> -omp 9000

# Node 2 — Talker (stage 1)
vllm serve Qwen/Qwen2.5-Omni-7B --omni \
    --tensor-parallel-size 2 \
    --stage-configs-path qwen2_5_omni_mori.yaml \
    --stage-id 1 --headless --log-stats \
    -oma <orchestrator_ip> -omp 9000

# Node 3 — Code2Wav (stage 2)
vllm serve Qwen/Qwen2.5-Omni-7B --omni \
    --tensor-parallel-size 2 \
    --stage-configs-path qwen2_5_omni_mori.yaml \
    --stage-id 2 --headless --log-stats \
    -oma <orchestrator_ip> -omp 9000

Benchmark client:

python openai_chat_completion_client_for_multimodal_generation.py --query-type text --port 8000

Test Result

0→1 (Thinker → Talker)

Field Mooncake Mori
in_flight_time_ms 1.389 0.829
rx_decode_time_ms 2.243 2.541
size_kbytes 1,611.713 1,611.713
tx_time_ms 0.914 0.926

1→2 (Talker → Code2Wav)

Field Mooncake Mori
in_flight_time_ms 1.291 0.510
rx_decode_time_ms 1.381 1.983
size_kbytes 3.305 3.305
tx_time_ms 0.362 0.350

0→1 (Thinker → Talker)(intranode communication)

Field Mooncake Mori
in_flight_time_ms 1.523 1.153
rx_decode_time_ms 2.453 1.887
size_kbytes 1,611.713 1,611.713
tx_time_ms 0.977 1.069

1→2 (Talker → Code2Wav)(intranode communication)

Field Mooncake Mori
in_flight_time_ms 1.566 1.927
rx_decode_time_ms 1.186 1.327
size_kbytes 3.305 3.305
tx_time_ms 0.316 0.341

Mori shows lower in-flight latency compared to Mooncake, with comparable tx/rx performance. The correctness of the generated audio output was also manually verified by listening to the results. It provides a viable RDMA backend option for AMD devices.

@inkcherry inkcherry requested a review from hsliuustc0106 as a code owner March 9, 2026 06:14
@inkcherry inkcherry marked this pull request as draft March 9, 2026 06:15
Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 30142a6a0a

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

[[src_offset]],
[remote_mem],
[[pull.dst_offset]],
[[pull.length]],
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Use source buffer size for RDMA write length

The sender already reads src_size from its own buffer table, but the transfer length is taken from pull.length (receiver-provided metadata). If that metadata is stale or malformed, the sender can truncate a valid payload (when pull.length < src_size) or attempt to read beyond the stored buffer (when pull.length > src_size) while still treating the operation as a normal transfer path. The write length should be validated against src_size (or derived solely from src_size) before calling batch_write.

Useful? React with 👍 / 👎.

@inkcherry inkcherry changed the title add moriio backend add moriio transfer engine Mar 11, 2026
inkcherry and others added 3 commits March 17, 2026 10:28
Signed-off-by: inkcherry <mingzhi.liu@amd.com>
Signed-off-by: zejwang <zejwang@amd.com>

Signed-off-by: zejwang  <zejwang@amd.com>
Signed-off-by: inkcherry <mingzhi.liu@amd.com>
@inkcherry inkcherry marked this pull request as ready for review March 17, 2026 13:59
Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: ed7bde1f46

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Comment on lines +793 to +797
if not bound:
try:
sock.bind(f"tcp://{self.host}:*")
bound = True
except zmq.ZMQError as exc:
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Fail fast instead of rebinding sender ZMQ to random port

When bind on self.zmq_port fails with EADDRINUSE, this code silently rebinds to an OS-assigned port, but the metadata-less receive path still resolves the sender using configured sender_host/sender_zmq_port (see get() -> _query_metadata_from_sender(), and call sites like kv_transfer_manager.receive_kv_cache_for_request and chunk_transfer_adapter._poll_single_request that call connector.get(..., metadata=None)). In that context, receivers keep querying the old configured port and time out indefinitely even though the sender is running, so this should fail fast or explicitly propagate the new port to receivers.

Useful? React with 👍 / 👎.

Comment on lines +766 to +767
stale = [k for k, v in self._local_buffers.items() if now - v[5] > _BUFFER_TTL_SECONDS]
for k in stale:
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Exclude in-flight buffers from TTL reclamation

TTL cleanup reclaims entries purely by age, but _handle_pull_request() only reads _local_buffers under lock and then performs batch_write(...); st.Wait() without marking that entry in-flight; if a receiver pulls a buffer near/after the 300s TTL, the listener thread can reclaim and release that allocator region concurrently while the RDMA write is still pending. This creates a race that can corrupt transfers or cause intermittent failures under delayed pulls, so stale-buffer GC needs an in-flight guard (or timestamp refresh) before transfer starts.

Useful? React with 👍 / 👎.

@hongxiayang
Copy link
Copy Markdown
Contributor

thanks @inkcherry for the PR.

@tjtanaa
Copy link
Copy Markdown
Member

tjtanaa commented Mar 19, 2026

@inkcherry is MoRII beneficial for intranode communication? will you enable it for intranode communication? Because vLLM-Omni also has many use cases on single node deployment.

@inkcherry
Copy link
Copy Markdown
Author

inkcherry commented Mar 24, 2026

@inkcherry is MoRII beneficial for intranode communication? will you enable it for intranode communication? Because vLLM-Omni also has many use cases on single node deployment.

Hi, @tjtanaa, I've added intra-node benchmark results. Mori delivers correct outputs with competitive performance.

@junkang1991
Copy link
Copy Markdown

Hi @inkcherry, thanks for the PR. We tried to test with Qwen/Qwen2.5-Omni-7B using the provided qwen2_5_omni_mori.yaml (for intra-node communication), but we couldn't confirm that Mori is actually being used. The serve log only shows:

Loaded OmniTransferConfig with 2 connector configurations

There's no mention of MoriTransferEngineConnector being initialized anywhere, so it's hard to tell if Mori is active or if something is silently falling back.

Would you be able to share the reproduction steps you used to verify this?

@inkcherry
Copy link
Copy Markdown
Author

inkcherry commented Mar 26, 2026

hi, @junkang1991 , thanks for the try, during the initialization phase, I can see such a log. The log shows that XGMI is automatically selected as the mori backend.
image

I have added logs wrapping st.Wait() in mori_transfer_engine_connector.py. You can also use this method to check transfer details.

            success = True
            for i, st in enumerate(statuses):
                logger.info(
                    "[MORI TRANSFER] request=%s: TransferStatus[%d] pre-Wait "
                    "code=%s init=%s in_progress=%s",
                    pull.request_id,
                    i,
                    st.Code(),
                    st.Init(),
                    st.InProgress(),
                )
                _wait_t0 = _time_mod.perf_counter()
                st.Wait()
                _wait_ms = (_time_mod.perf_counter() - _wait_t0) * 1000
                logger.info(
                    "[MORI TRANSFER] request=%s: TransferStatus[%d] post-Wait "
                    "wait_ms=%.3f succeeded=%s failed=%s code=%s msg=%r",
                    pull.request_id,
                    i,
                    _wait_ms,
                    st.Succeeded(),
                    st.Failed(),
                    st.Code(),
                    st.Message(),
                )

If you still encounter issues, could you try updating Mori to a newer commit?

…node XGMI

Signed-off-by: junkang1991 <junkangchow@gmail.com>
@tjtanaa
Copy link
Copy Markdown
Member

tjtanaa commented Apr 1, 2026

@inkcherry @junkang1991 please fix the readthedocs and pre-commit error.

@inkcherry
Copy link
Copy Markdown
Author

@tjtanaa , thanks for the reminder, fixd, cc @junkang1991

@inkcherry
Copy link
Copy Markdown
Author

inkcherry commented Apr 10, 2026

This refactor #1908 remove multi-node deployment on vllm-omni codebase. Current single node deployment still work. I think we can move forward with the PR at this stage, I've removed the inter-node configuration script for now. Once the vllm-omni codebase supports inter-node deployment, we will add it back after re-testing.

Signed-off-by: inkcherry <mingzhi.liu@amd.com>

## When to Use

Best for high-performance multi-node data transfer using Mori RDMA transfer engine.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you update the description for now. Stating this reason #1742 (comment) . And mention that intra-node is currently supported.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, updated.

Signed-off-by: inkcherry <mingzhi.liu@amd.com>
Signed-off-by: inkcherry <mingzhi.liu@amd.com>
knitcapcat and others added 5 commits April 29, 2026 21:48
Clarifies how to opt this deploy yaml into true AMD Infinity Fabric
XGMI via the HIP transport landed in Mooncake PRs vllm-project#1742 and vllm-project#1550,
and pins the default value to ``protocol: "rdma"`` — the path that is
actually end-to-end validated on this branch (three consecutive text
completions at 18.7 / 16.8 / 16.4 s with 93 per-chunk RDMA GETs).

XGMI opt-in requires three things in lock step and is therefore not
the default:

1. A mooncake wheel rebuilt with ``-DUSE_HIP=ON`` and reinstalled into
   the container's Python env (stock wheels don't ship HIP transport).
2. ``memory_pool_device: "cuda"`` so Mooncake picks the HIP allocator.
3. ``MC_FORCE_MNNVL=1`` in the launch environment so
   ``TransferEngineImpl::init()`` installs the HIP transport instead
   of defaulting back to RDMA whenever the node has HCAs (the current
   auto-topology path treats RDMA as the preferred fabric, even when
   HIP is compiled).

Signed-off-by: Zejian Wang <zejianwang@sjtu.edu.cn>
Made-with: Cursor
Pure cosmetic reformatting requested by ruff-format:

- mooncake_transfer_engine_connector.py: collapse a 2-line f-string into
  one line.
- mori_transfer_engine_connector.py: same.
- utils/initialization.py: drop redundant indentation in the
  `_ROLE_BOUND_ZMQ_CONNECTORS` frozenset literal and collapse two list
  comprehensions in `get_connectors_config_for_stage` to a single line.

Verified locally with `pre-commit run --all-files` (all hooks pass).

Signed-off-by: Zejian Wang <zejianwang@sjtu.edu.cn>
- P1 length: validate pull.length against src_size before batch_write
  (prevents silent truncation / cross-allocation OOB read).
- P2 port: drop EADDRINUSE silent fallback; match Mooncake's fail-fast.
- P2 TTL: TODO mirroring wzliu's note; same race deferred to follow-up.

Co-authored-by: Cursor <cursoragent@cursor.com>
Signed-off-by: Zejian Wang <zejianwang@sjtu.edu.cn>
@knitcapcat-amd
Copy link
Copy Markdown

@inkcherry Could you submit a topic about this PR in the sync meeting? If you're at UTC+8 Timezone, you could submit it here: tinyurl.com/vllm-omni-meeting.

Hi @gcanlin, we plan to share this PR this Wednesday. Does this meeting require any registration before the sharing session?



class ManagedBuffer:
"""Zero-copy view into the global memory pool.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These classes (ManagedBuffer, BufferAllocator) with the same functionality as Mooncake TE connector have been moved to omni_connectors/utils/memory_pool.py. It's suggested to use those classes.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for catching this! Done in afbf08e, Mori now import BufferAllocator / ManagedBuffer from omni_connectors/utils/memory_pool.

except Exception as e:
logger.warning(f"Failed to terminate ZMQ context: {e}")

self.pool = None # type: ignore[assignment]
Copy link
Copy Markdown
Contributor

@natureofnature natureofnature May 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can Mori unregister pin memory and shutdown engine for clean up?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion! Done in b6daf0aclose() now calls deregister_memory + deregister_remote_engine for each registered peer, then drops the IOEngine ref so GC runs the C++ destructor (mori 1.1.2 doesn't expose an explicit shutdown()).

@natureofnature
Copy link
Copy Markdown
Contributor

natureofnature commented May 21, 2026

Dual mode / chunk-transfer endpoints are not rank-aware: _inject_chunk_path_endpoints() derives ports as base_port + stage_id, with no TP rank term. If any stage runs with tensor_parallel_size > 1, same-stage workers will bind the same ZMQ port, and heterogeneous TP cannot route per-rank shards because the connector only has a single upstream sender_host/sender_zmq_port. I suggest to fail fast for TP>1 on this path or enforce that dual chunk transfer is TP=1-only.

Signed-off-by: Zejian Wang <zejianwang@sjtu.edu.cn>
Mirrors the Mooncake teardown flow: explicit ``deregister_memory`` for
the pinned pool plus ``deregister_remote_engine`` for every remote
registered via ``_ensure_remote_registered``, then drop the only
Python reference so the IOEngine C++ destructor unwinds backend and
RDMA fabric state under Python GC.  ``mori.io.IOEngine`` does not
expose an explicit ``shutdown()``/``close()`` entry-point (confirmed
on mori 1.1.2.dev30, ``dir(mori.io.IOEngine)`` has no
shutdown/close/destroy/stop/cleanup/terminate), so the
drop-ref-and-GC approach is the supported teardown path.

``deregister_remote_engine`` is typed
``(self, engine_desc: libmori_pybinds.EngineDesc)`` in the pybind
binding, not a key string, so ``_registered_engines`` now stores
``{key -> EngineDesc}`` instead of ``set[str]`` and ``close()``
passes the actual desc back.  Without this every teardown would
hit ``TypeError: incompatible function arguments`` that the
surrounding ``try/except`` swallowed silently and the remote
engines would never be deregistered.

Signed-off-by: Zejian Wang <zejianwang@sjtu.edu.cn>
Signed-off-by: Zejian Wang <zejianwang@sjtu.edu.cn>
knitcapcat and others added 2 commits May 26, 2026 07:42
Conflicts resolved:
  - vllm_omni/distributed/omni_connectors/utils/initialization.py:
    keep both new symbols; main's ``KV_REPLICA_PORT_STRIDE`` (semantic
    sibling of ``KV_RANK_PORT_STRIDE``) placed immediately after it,
    followed by the chunk-path-side additions
    (``_ROLE_BOUND_ZMQ_CONNECTORS`` set and ``_detect_local_ip``).
  - vllm_omni/distributed/omni_connectors/factory.py:
    keep both new ``register_connector`` calls
    (``MoriTransferEngineConnector`` and
    ``YuanrongTransferEngineConnector``), grouped after
    ``YuanrongConnector``.
  - vllm_omni/distributed/omni_connectors/connectors/mooncake_transfer_engine_connector.py:
    take main side and drop the 4-line "imported from utils/memory_pool"
    explanatory comment, matching what main settled on (and aligning
    with ``MoriTransferEngineConnector``, which doesn't carry that
    comment either).

Signed-off-by: Zejian Wang <zejianwang@sjtu.edu.cn>
@knitcapcat-amd
Copy link
Copy Markdown

Dual mode / chunk-transfer endpoints are not rank-aware: _inject_chunk_path_endpoints() derives ports as base_port + stage_id, with no TP rank term. If any stage runs with tensor_parallel_size > 1, same-stage workers will bind the same ZMQ port, and heterogeneous TP cannot route per-rank shards because the connector only has a single upstream sender_host/sender_zmq_port. I suggest to fail fast for TP>1 on this path or enforce that dual chunk transfer is TP=1-only.

Thanks for the review! Done in ef596e7 , Mori's __init__ now raise NotImplementedError on tp_world_size > 1.

Per-edge support will land in a follow-up alongside the new
OmniConnectorModelRunnerMixin (RFC vllm-project#1546 §2.5).

Signed-off-by: Zejian Wang <zejianwang@sjtu.edu.cn>
@hsliuustc0106 hsliuustc0106 added the high priority high priority issue, needs to be done asap label May 28, 2026
@hsliuustc0106 hsliuustc0106 added this to the v0.22.0 milestone May 28, 2026
Signed-off-by: Zejian Wang <zejianwang@sjtu.edu.cn>
# See ``MoriTransferEngineConnector`` for the rationale.
tp_world_size = get_tp_world_size()
if tp_world_size > 1:
raise NotImplementedError(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here should also be reverted. Otherwise, models like Hunyuan Image, which uses TP, will fail.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch, reverted in 8a92219

knitcapcat and others added 3 commits May 29, 2026 02:22
Signed-off-by: Zejian Wang <zejianwang@sjtu.edu.cn>
Replace the yaml link with an inline code path; mkdocs --strict
rejects links to non-documentation files and broke the RTD build.

Signed-off-by: Zejian Wang <zejianwang@sjtu.edu.cn>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

high priority high priority issue, needs to be done asap

Projects

None yet

Development

Successfully merging this pull request may close these issues.

9 participants